/*
 * SPDX-License-Identifier: Apache-2.0
 *
 * The OpenSearch Contributors require contributions made to
 * this file be licensed under the Apache-2.0 license or a
 * compatible open source license.
 */

package org.opensearch.indices.pollingingest.mappers;

import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.Message;
import org.opensearch.indices.pollingingest.IngestionUtils;
import org.opensearch.indices.pollingingest.ShardUpdateMessage;

import java.util.Map;

import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.ID;
import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.OP_TYPE;
import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.SOURCE;

/**
 * Mapper implementation that uses the entire message payload as the document source.
 * The message pointer (Kafka offset, Kinesis sequence number, etc) along with shardID is used as the document ID, and the operation type is "index".
 * For the document ID, the shard ID is prefixed with the shard pointer to ensure uniqueness across all shards.
 *
 * <p> Note that raw payload will not support document versioning. Eventually consistent view of the documents can be expected
 * if there is a shard recovery resulting in message replay.
 */
public class RawPayloadIngestionMessageMapper implements IngestionMessageMapper {

    private static final String OP_TYPE_INDEX = "index";
    private final int shardId;

    public RawPayloadIngestionMessageMapper(int shardId) {
        this.shardId = shardId;
    }

    @Override
    public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException {
        // Parse the raw payload - this will be the _source content
        Map<String, Object> sourceMap = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());

        // Use shard ID prefix + pointer as the document ID to ensure uniqueness across shards
        String id = shardId + "-" + pointer.asString();

        // No auto-generated ID timestamp since we're using the pointer as ID
        long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;

        // Construct the full payload map with metadata fields and source
        Map<String, Object> payloadMap = Map.of(ID, id, OP_TYPE, OP_TYPE_INDEX, SOURCE, sourceMap);

        return new ShardUpdateMessage(pointer, message, payloadMap, autoGeneratedIdTimestamp);
    }
}
